package com.shiku.imserver.repository.runnable;

import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.shiku.imserver.common.message.ChatMessage;
import java.util.Collections;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.bson.Document;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ChatMessageDBRunnable extends BaseRepositoryMapRunnable<List<Document>> {
  private MongoDatabase chatMsgDB;
  
  protected ConcurrentHashMap<String, MongoCollection<Document>> dbMaps = new ConcurrentHashMap<>();
  
  protected ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
  
  private static Logger log = LoggerFactory.getLogger(ChatMessageDBRunnable.class);
  
  public ChatMessageDBRunnable(MongoDatabase chatMsgDB) {
    this.chatMsgDB = chatMsgDB;
  }
  
  public MongoCollection<Document> getMongoCollection(String collectionName) {
    MongoCollection<Document> collection = this.dbMaps.get(collectionName);
    if (null == collection)
      collection = this.chatMsgDB.getCollection(collectionName); 
    collectionName = null;
    return collection;
  }
  
  public void putMessageToTask(ChatMessage message) {
	  log.info("[ww]","=======putMessageToTask=====");
    Document dbObj = new Document();
    try {
      dbObj.put("message", message.toString());
      dbObj.put("direction", Integer.valueOf(0));
      if (!message.getFromUserId().equals(message.getToUserId())) {
        dbObj.put("sender", message.getFromUserId());
        dbObj.put("receiver", message.getToUserId());
      } else {
        dbObj.put("sender", message.getMessageHead().getFrom());
        dbObj.put("receiver", message.getMessageHead().getTo());
      } 
      dbObj.put("sender_jid", message.getMessageHead().getFrom());
      dbObj.put("receiver_jid", message.getMessageHead().getTo());
      dbObj.put("ts", Long.valueOf(System.currentTimeMillis()));
      dbObj.put("contentType", Short.valueOf(message.getType()));
      dbObj.put("messageId", message.getMessageHead().getMessageId());
      dbObj.put("timeSend", Long.valueOf(message.getTimeSend()));
      dbObj.put("deleteTime", Long.valueOf(message.getDeleteTime()));
      dbObj.put("isRead", Integer.valueOf(0));
      dbObj.put("isEncrypt", Boolean.valueOf(message.isEncrypt()));
      if (null != message.getContent())
        dbObj.put("content", message.getContent()); 
    } catch (Exception e) {
      e.printStackTrace();
    } 
    String key = null;
    try {
      key = getCollectionName(message.getFromUserId());
    } catch (NumberFormatException e) {
      log.error("error {} ===> {} ", e.getMessage(), message.toString());
      return;
    } catch (Exception e) {
      log.error(e.getMessage(), e);
      return;
    } 
    Document toDbObj = Document.parse(dbObj.toJson());
    toDbObj.replace("direction", Integer.valueOf(1));
    if (!message.getFromUserId().equals(message.getToUserId())) {
      toDbObj.replace("sender", message.getToUserId());
      toDbObj.replace("receiver", message.getFromUserId());
      try {
        this.lock.writeLock().lock();
        List<Document> list = (List<Document>)this.maps.get(key);
        if (null == list)
          list = Collections.synchronizedList(new LinkedList<>()); 
        list.add(dbObj);
        addMsg(key, list);
        String tokey = null;
        try {
          tokey = getCollectionName(message.getToUserId());
        } catch (NumberFormatException e) {
          log.error("error {} ===> {} ", e.getMessage(), message.toString());
          return;
        } catch (Exception e) {
          log.error(e.getMessage(), e);
          return;
        } 
        List<Document> tolist = (List<Document>)this.maps.get(tokey);
        if (null == tolist)
          tolist = Collections.synchronizedList(new LinkedList<>()); 
        tolist.add(toDbObj);
        addMsg(tokey, tolist);
      } catch (Exception e) {
        log.error(e.getMessage());
      } finally {
        this.lock.writeLock().unlock();
      } 
    } else {
      toDbObj.put("sender", message.getMessageHead().getTo());
      toDbObj.put("receiver", message.getMessageHead().getFrom());
      try {
        this.lock.writeLock().lock();
        List<Document> list = (List<Document>)this.maps.get(key);
        if (null == list)
          list = Collections.synchronizedList(new LinkedList<>()); 
        list.add(dbObj);
        list.add(toDbObj);
        addMsg(key, list);
      } catch (Exception e) {
        log.error(e.getMessage());
      } finally {
        this.lock.writeLock().unlock();
      } 
    } 
  }
  
  public void runTask() {
    try {
      MongoCollection<Document> collection = null;
      List<Document> list = null;
      Iterator<Map.Entry<String, List<Document>>> iterator = null;
      iterator = this.maps.entrySet().iterator();
      Map.Entry<String, List<Document>> entry = null;
      while (iterator.hasNext()) {
        try {
          entry = iterator.next();
          list = entry.getValue();
          this.lock.writeLock().lock();
          if (!list.isEmpty()) {
            collection = getMongoCollection(entry.getKey());
            collection.insertMany(list);
            list.clear();
          } else {
            iterator.remove();
            list = null;
          } 
        } catch (Exception e) {
          log.error(e.toString(), e);
        } finally {
          this.lock.writeLock().unlock();
        } 
      } 
    } catch (Exception e) {
      log.error(e.toString(), e);
    } finally {}
  }
}
